/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.engine.meta.crawler

import com.chinamobile.cmss.lakehouse.engine.meta.crawler.constant.ConfigPath
import com.chinamobile.cmss.lakehouse.engine.meta.crawler.model.Schema
import com.typesafe.config.Config
import io.circe.generic.auto._
import io.circe.syntax._
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.slf4j.LoggerFactory
import scalaj.http.Http

case class SimpleHttpSink(jobConfig: Config) extends SinkFunction[Schema] {

  val url =
    jobConfig.getString(ConfigPath.METADATA_URL) + "/user/sqlExecuteProxy"
  private val logger = LoggerFactory.getLogger(getClass)

  override def invoke(schema: Schema, context: SinkFunction.Context): Unit = {
    logger.info("sink table {} start", schema.name)
    logger.info("sink url {}", url)
    val maxColumn = jobConfig.getInt(ConfigPath.MAX_COLUMN)
    if (schema.columns.length > maxColumn) {
      throw new RuntimeException("Failed to create table with " + schema.columns.length + " column, please check file content. " +
        "Only support create table with column less than " + maxColumn)
    }
    val alterSql = schema.toAddPartitionSql()
    val sql = alterSql
      .map(a => schema.toHiveCreateSql() + ";" + a)
      .getOrElse(schema.toHiveCreateSql())
    val user = jobConfig.getString(ConfigPath.USER)
    val jobId = jobConfig.getString(ConfigPath.JOB_ID)
    logger.info("send sql:" + sql)
    val postData = SqlExecuteContextDto(sql, jobId)
    val response = Http(url)
      .header("Content-Type", "application/json;charset=UTF-8")
      .header("User-Id", user)
      .header("Request-Id", jobId)
      .postData(postData.asJson.toString())
      .asString
    if (response.is2xx) {
      logger.info("sink table {} success", schema.name)
    } else {
      logger.error(
        s"sink table ${schema.name} failure due to ${response.body}"
      )
    }
  }

}

case class SqlExecuteContextDto(
                                 sql: String,
                                 taskId: String,
                                 refTaskType: String = "metadata_crawler_task")
